# pylint: disable=wrong-import-position
import utime
from micropython import const
from trezorui import Display
from typing import TYPE_CHECKING

from trezor import io, log, loop, utils, wire, workflow
from trezor.messages import ButtonAck, ButtonRequest
from trezor.wire import context
from trezor.wire.protocol_common import Context
from trezorui_api import (
    AttachType,
    BacklightLevels,
    LayoutState,
    backlight_fade,
    backlight_set,
)

if utils.USE_POWER_MANAGER:
    from trezor.power_management.autodim import autodim_clear

if TYPE_CHECKING:
    from typing import Any, Callable, Generator, Generic, Iterator, TypeVar

    from trezor.enums import ButtonRequestType
    from trezorui_api import LayoutObj, UiResult  # noqa: F401

    T = TypeVar("T", covariant=True)
    ButtonRequestMsg = tuple[ButtonRequestType, str] | None
else:
    T = 0
    Generic = {T: object}


if __debug__:
    from trezorui_api import disable_animation

    disable_animation(utils.DISABLE_ANIMATION)


# all rendering is done through a singleton of `Display`
display = Display()

# re-export constants from modtrezorui
WIDTH: int = Display.WIDTH
HEIGHT: int = Display.HEIGHT

_REQUEST_ANIMATION_FRAME = const(1)
"""Animation frame timer token.
See `trezor::ui::layout::base::EventCtx::ANIM_FRAME_TIMER`.
"""

_UNRESPONSIVE_WARNING_TIMEOUT_MS = const(2000)


# allow only one alert at a time to avoid alerts overlapping
_alert_in_progress = False


async def _alert(count: int) -> None:
    short_sleep = loop.sleep(20)
    long_sleep = loop.sleep(80)
    for i in range(count * 2):
        if i % 2 == 0:
            backlight_set(BacklightLevels.MAX)
            await short_sleep
        else:
            backlight_set(BacklightLevels.DIM)
            await long_sleep
    backlight_set(BacklightLevels.NORMAL)
    global _alert_in_progress
    _alert_in_progress = False


def alert(count: int = 3) -> None:
    if utils.USE_BACKLIGHT:
        global _alert_in_progress
        if _alert_in_progress:
            return

        _alert_in_progress = True
        loop.schedule(_alert(count))


async def _waiting_screen() -> None:
    from trezor import TR
    from trezor.ui.layouts import show_wait_text

    await loop.sleep(_UNRESPONSIVE_WARNING_TIMEOUT_MS)
    show_wait_text(TR.words__comm_trouble)


class Shutdown(Exception):
    pass


CURRENT_LAYOUT: "Layout | ProgressLayout | None" = None


def set_current_layout(layout: "Layout | ProgressLayout | None") -> None:
    """Set the current global layout.

    All manipulation of the global `CURRENT_LAYOUT` MUST go through this function.
    It ensures that the transitions are always to/from None (so that there are never
    two layouts in RUNNING state), and that the debug UI is notified of the change.
    """
    global CURRENT_LAYOUT

    # all transitions must be to/from None
    assert (CURRENT_LAYOUT is None) == (layout is not None)

    CURRENT_LAYOUT = layout


if utils.USE_POWER_MANAGER:

    def _handle_power_button_press() -> None:
        """Handle power button press event during firmware operation."""
        from apps.common.lock_manager import notify_suspend

        notify_suspend()


class Layout(Generic[T]):
    """Python-side handler and runner for the Rust based layouts.

    Wrap a `LayoutObj` instance in `Layout` to be able to display the layout, run its
    event loop, and take part in global layout management. See
    [docs/core/misc/layout-lifecycle.md] for details.
    """

    if __debug__:

        @staticmethod
        def _trace(layout: LayoutObj) -> str:
            tokens = []

            def callback(*args: str) -> None:
                tokens.extend(args)

            layout.trace(callback)
            return "".join(tokens)

        def __str__(self) -> str:
            return f"{repr(self)}({self._trace(self.layout)[:150]})"

        @staticmethod
        def notify_debuglink(layout: "Layout | None") -> None:
            from apps.debug import notify_layout_change

            notify_layout_change(layout)

    def __init__(self, layout: LayoutObj[T]) -> None:
        """Set up a layout."""
        self.layout = layout
        self.tasks: set[loop.Task] = set()
        self.timers: dict[int, loop.Task] = {}
        self.result_box: loop.mailbox[Any] = loop.mailbox()
        self.button_request_ack_pending: bool = False
        self.button_request_box: loop.mailbox[ButtonRequestMsg] = loop.mailbox()
        self.button_request_task: loop.Task | None = None
        self.transition_out: AttachType | None = None
        self.backlight_level = BacklightLevels.NORMAL
        self.context: Context | None = None
        self.state: LayoutState = LayoutState.INITIAL

        # Indicates whether we should use Resume attach style when launching.
        # Homescreen layouts can override this.
        self.should_resume = False

    def is_ready(self) -> bool:
        """True if the layout is in READY state."""
        return CURRENT_LAYOUT is not self and self.result_box.is_empty()

    def is_running(self) -> bool:
        """True if the layout is in RUNNING state."""
        return CURRENT_LAYOUT is self

    def is_finished(self) -> bool:
        """True if the layout is in FINISHED state."""
        return CURRENT_LAYOUT is not self and not self.result_box.is_empty()

    def is_layout_attached(self) -> bool:
        return self.state is LayoutState.ATTACHED

    def start(self) -> None:
        """Start the layout, stopping any other RUNNING layout.

        If the layout is already RUNNING, do nothing. If the layout is FINISHED, fail.
        """
        # do nothing if we are already running
        if self.is_running():
            return

        # make sure we are not restarted before picking the previous result
        assert self.is_ready()

        transition_in = AttachType.RESUME if self.should_resume else AttachType.INITIAL

        # set up the global layout, shutting down any competitors
        # (caller should still call `workflow.close_others()` to ensure that someone
        # else will not just shut us down immediately)
        if CURRENT_LAYOUT is not None:
            prev_layout = CURRENT_LAYOUT
            prev_layout.stop()
            transition_in = prev_layout.transition_out

        assert CURRENT_LAYOUT is None
        # do not notify debuglink, we will do it when we receive an ATTACHED event
        set_current_layout(self)

        # save context
        self.context = context.CURRENT_CONTEXT

        # attach a timer callback and paint self
        self._event(self.layout.attach_timer_fn, self._set_timer, transition_in)

        # spawn all tasks
        for task in self.create_tasks():
            self._start_task(task)

    def stop(self, _close_all: bool = True) -> None:
        """Stop the layout, moving out of RUNNING state and unsetting self as the
        current layout.

        The resulting state is either READY (if there is no result to be picked up) or
        FINISHED.

        When called externally, this kills any tasks that wait for the result, assuming
        that the external `stop()` is a kill. When called internally, `_close_all` is
        set to False to indicate that a result became available and that the taker
        should be allowed to pick it up.
        """
        # stop all running timers and spawned tasks
        for timer in self.timers.values():
            loop.close(timer)

        not_closed = set()
        for task in self.tasks:
            if not _close_all and task is self.button_request_task:
                # Keep `ButtonRequest` handler alive.
                # It will be awaited and closed in `get_result()`.
                not_closed.add(task)
                continue
            if task != loop.this_task:
                loop.close(task)
        self.timers.clear()
        self.tasks = not_closed

        self.transition_out = self.layout.get_transition_out()

        # shut down anyone who is waiting for the result
        if _close_all:
            self.result_box.maybe_close()

        if CURRENT_LAYOUT is self:
            # fade to black -- backlight is off while no layout is running
            backlight_fade(BacklightLevels.NONE)

            set_current_layout(None)
            if __debug__:
                if self.button_request_ack_pending:
                    msg = "button request ack pending"
                    if utils.USE_THP:
                        # Don't raise to avoid THP desync
                        log.error(__name__, msg)
                    else:
                        raise wire.FirmwareError(msg)
                self.notify_debuglink(None)

    async def get_result(self) -> T:
        """Wait for, and return, the result of this UI layout."""
        if self.is_ready():
            self.start()
        # else we are (a) still running or (b) already finished
        is_done = None
        try:
            if self.context is not None and self.result_box.is_empty():
                is_done = loop.mailbox()  # (see below)
                self.button_request_task = self._handle_button_requests(is_done)
                self._start_task(self.button_request_task)

            result = await self.result_box
            assert CURRENT_LAYOUT is None  # the screen is blank now

            if is_done is not None:
                # Make sure ButtonRequest is ACKed, before the result is returned.
                # Otherwise, THP channel may become desynced (due to two consecutive writes).
                self.button_request_box.put(None, replace=True)
                task = loop.spawn(_waiting_screen())
                try:
                    await is_done
                finally:
                    task.close()

            return result
        finally:
            # Close all tasks (including ButtonRequest handler)
            self.stop()

    def request_complete_repaint(self) -> None:
        """Request a complete repaint of the layout."""
        msg = self.layout.request_complete_repaint()
        assert msg is None

    def repaint(self) -> None:
        """Repaint the layout. Forces drawing at this moment.

        Useful for recovering from an externally cleared screen, such as on resume
        from suspend.
        """
        self.request_complete_repaint()
        self.layout.paint()
        backlight_fade(self.backlight_level)

    def _event(self, event_call: Callable[..., LayoutState | None], *args: Any) -> None:
        """Process an event coming out of the Rust layout. Set is as a result and shut
        down the layout if appropriate, do nothing otherwise."""
        if __debug__ and CURRENT_LAYOUT is not self:
            raise wire.FirmwareError("layout received an event but it is not running")

        first_paint = False
        state = event_call(*args)
        self.transition_out = self.layout.get_transition_out()

        if state is LayoutState.DONE:
            self._emit_message(self.layout.return_value())

        elif state is LayoutState.ATTACHED:
            first_paint = True
            self.button_request_ack_pending = self._button_request()
            if self.button_request_ack_pending:
                state = LayoutState.TRANSITIONING
            elif __debug__:
                self.notify_debuglink(self)

        if state is not None:
            self.state = state

        if first_paint:
            self._first_paint()
        else:
            self._paint()

    def _button_request(self) -> bool:
        """Process a button request coming out of the Rust layout."""
        res = self.layout.button_request()
        if res is None:
            return False

        if self.context is None:
            return False

        if __debug__ and not self.button_request_box.is_empty():
            raise wire.FirmwareError(
                "button request already pending -- "
                "don't forget to yield your input flow from time to time ^_^"
            )

        # in production, we don't want this to fail, hence replace=True
        self.button_request_box.put(res, replace=True)
        return True

    def _paint(self) -> None:
        """Paint the layout and ensure that homescreen cache is properly invalidated."""
        import storage.cache as storage_cache

        painted = self.layout.paint()
        if storage_cache.homescreen_shown is not None and painted:
            storage_cache.homescreen_shown = None

    def _first_paint(self) -> None:
        """Paint the layout for the first time after starting it.

        This is a separate call in order for homescreens to be able to override and not
        paint when the screen contents are still valid.
        """
        self.repaint()

    def _set_timer(self, token: int, duration_ms: int) -> None:
        """Timer callback for Rust layouts."""

        async def timer_task() -> None:
            self.timers.pop(token)
            try:
                self._event(self.layout.timer, token)
            except Shutdown:
                pass

        if token == _REQUEST_ANIMATION_FRAME and token in self.timers:
            # do not schedule another animation frame if one is already scheduled
            return

        task = self.timers.get(token)
        if task is None:
            task = timer_task()
            self.timers[token] = task

        deadline = utime.ticks_add(utime.ticks_ms(), duration_ms)
        loop.schedule(task, deadline=deadline, reschedule=True)

    def _emit_message(self, msg: Any) -> None:
        """Process a message coming out of the Rust layout. Set is as a result and shut
        down the layout if appropriate, do nothing otherwise."""
        # when emitting a message, there should not be another one already waiting
        assert self.result_box.is_empty()
        self.stop(_close_all=False)
        self.result_box.put(msg)
        raise Shutdown()

    def create_tasks(self) -> Iterator[loop.Task]:
        """Set up background tasks for a layout.

        Called from `start()`. Creates and yields a list of background tasks, typically
        event handlers for different interfaces. Event handlers are enabled conditionally
        based on build options to prevent stale events in the event queue.

        Override and then `yield from super().create_tasks()` to add more tasks."""
        if utils.USE_BUTTON:
            yield self._handle_button_events()
        if utils.USE_TOUCH:
            yield self._handle_touch_events()
        if utils.USE_BLE:
            yield self._handle_ble_events()
        if utils.USE_POWER_MANAGER:
            yield self._handle_power_manager()

    if utils.USE_BUTTON:

        def _handle_button_events(self) -> Generator[Any, tuple[int, int], None]:
            """Task that is waiting for the user button input."""
            button = loop.wait(io.BUTTON)
            try:
                while True:
                    # Using `yield` instead of `await` to avoid allocations.
                    event = yield button
                    if utils.USE_POWER_MANAGER:
                        event_type, event_button = event
                        # check for POWER_BUTTON (2), BUTTON_UP (0)
                        if event_button == 2 and event_type == 0:
                            _handle_power_button_press()
                    workflow.idle_timer.touch()
                    self._event(self.layout.button_event, *event)
            except Shutdown:
                return
            finally:
                button.close()

    if utils.USE_TOUCH:

        def _handle_touch_events(self) -> Generator[Any, tuple[int, int, int], None]:
            """Task that is waiting for the user touch input."""
            touch = loop.wait(io.TOUCH)
            try:
                while True:
                    # Using `yield` instead of `await` to avoid allocations.
                    event = yield touch
                    workflow.idle_timer.touch()
                    if utils.USE_POWER_MANAGER:
                        autodim_clear()

                    self._event(self.layout.touch_event, *event)
            except Shutdown:
                return
            finally:
                touch.close()

    async def _handle_button_requests(self, is_done: loop.mailbox[None] | None) -> None:
        try:
            if self.context is None:
                return
            while True:
                # The following task will raise `UnexpectedMessageException` on any message.
                unexpected_read = self.context.read(())
                result = await loop.race(unexpected_read, self.button_request_box)
                if result is None:
                    return  # exit the loop when the layout is done.
                assert isinstance(result, tuple)
                br_code, br_name = result

                if __debug__:
                    log.info(__name__, "ButtonRequest sent: %s", br_name)
                await self.context.call(
                    ButtonRequest(
                        code=br_code, pages=self.layout.page_count(), name=br_name
                    ),
                    ButtonAck,
                )
                if __debug__:
                    log.info(__name__, "ButtonRequest acked: %s", br_name)

                if (
                    self.button_request_ack_pending
                    and self.state is LayoutState.TRANSITIONING
                ):
                    self.button_request_ack_pending = False
                    self.state = LayoutState.ATTACHED
                    if __debug__:
                        self.notify_debuglink(self)
        finally:
            if is_done is not None:
                is_done.put(None)

    if utils.USE_BLE:

        async def _handle_ble_events(self) -> None:
            blecheck = loop.wait(io.BLE_EVENT)
            try:
                while True:
                    event = await blecheck
                    if __debug__:
                        import trezorble as ble

                        log.debug(
                            __name__,
                            "BLE event: %s, state: %s",
                            event,
                            ",".join(ble.connection_flags()),
                        )
                    self._event(self.layout.ble_event, *event)
            except Shutdown:
                return

    if utils.USE_POWER_MANAGER:

        def _handle_power_manager(self) -> Generator[Any, int, None]:
            pm = loop.wait(io.PM_EVENT)
            try:
                while True:
                    flags = yield pm
                    if flags & io.pm.EVENT_USB_CONNECTED_CHANGED:
                        # disconnecting from charger restarts autodim/autolock timer
                        # connecting to charger clears autodim state
                        workflow.idle_timer.touch()
                        autodim_clear()
                    self._event(self.layout.pm_event, flags)
            except Exception:
                raise
            finally:
                pm.close()

    def _task_finalizer(self, task: loop.Task, value: Any) -> None:
        if value is None:
            # all is good
            if __debug__:
                log.debug(__name__, "UI task exited by itself: %s", task)
            return

        if isinstance(value, GeneratorExit):
            if __debug__:
                log.debug(__name__, "UI task was stopped: %s", task)
            return

        if isinstance(value, BaseException):
            if __debug__ and value.__class__.__name__ != "UnexpectedMessageException":
                log.error(
                    __name__, "UI task died: %s (%s)", task, value.__class__.__name__
                )
            try:
                self._emit_message(value)
            except Shutdown:
                pass
            return

        if __debug__:
            log.error(__name__, "UI task returned non-None: %s (%s)", task, value)

    def _start_task(self, task: loop.Task) -> None:
        self.tasks.add(task)
        loop.schedule(task, finalizer=self._task_finalizer)

    def __del__(self) -> None:
        self.layout.__del__()


class ProgressLayout:
    """Progress layout.

    Simplified version of the general Layout object, for the purpose of showing spinners
    and loaders that are shown "in the background" of a running workflow. Does not run
    background tasks, does not respond to timers.

    Participates in global layout management. This is to track whether the progress bar
    is currently displayed, who needs to redraw and when.
    """

    def __init__(self, layout: LayoutObj[UiResult]) -> None:
        self.layout = layout
        self.transition_out = None
        self.value = 0
        self.progress_step = 20

    def is_layout_attached(self) -> bool:
        return True

    def report(self, value: int, description: str | None = None) -> None:
        """Report a progress step.

        Starts the layout if it is not running.

        `value` can be in range from 0 to 1000.
        """
        if CURRENT_LAYOUT is not self:
            self.start()

        workflow.idle_timer.touch()

        if utils.DISABLE_ANIMATION:
            return

        def do_progress_event(val: int) -> None:
            msg = self.layout.progress_event(val, description or "")
            assert msg is None
            self.layout.paint()

        # animate the progress bar in a blocking fashion
        step = min(self.progress_step, max(value - self.value, 1))
        last_value = self.value
        for v in range(self.value, min(value, 1000) + 1, step):
            do_progress_event(v)
            last_value = v
        if value >= 1000 and last_value != 1000:
            do_progress_event(1000)
        self.value = value

    def start(self) -> None:
        if CURRENT_LAYOUT is not self and CURRENT_LAYOUT is not None:
            CURRENT_LAYOUT.stop()

        assert CURRENT_LAYOUT is None
        set_current_layout(self)

        self.repaint()

    def stop(self) -> None:
        if CURRENT_LAYOUT is self:
            set_current_layout(None)

    def repaint(self) -> None:
        """Repaint the layout. Forces drawing at this moment.

        Useful for recovering from an externally cleared screen, such as on resume
        from suspend.
        """
        self.layout.request_complete_repaint()
        self.layout.paint()
        backlight_fade(BacklightLevels.NORMAL)
